{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "原文：<https://zhuanlan.zhihu.com/p/96969508>\n",
    "\n",
    "<small>发布于 2019-12-13</small>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 前言\n",
    "\n",
    "这篇文章我们再次回到协程。协程的技术实际上是非常重要的，比如微信的后台就大量的使用协程来进行并发量的提高，只用多线程和多进程很难低成本、高效地满足全球数十亿用户的各种操作请求。协程的底层实现可以定制的，微信团队就开源了一个他们实现的协程库。直接分析他们的实现比较困难，所以我们先分析python官方的协程库asyncio的底层实现。\n",
    "\n",
    "<https://github.com/Tencent/libco>\n",
    "\n",
    "这篇文章主要基于之前的两篇文章。在协程文章中，主要介绍了协程的理解和特点，在Redis单线程请求响应文章中，主要介绍了IO复用技术，以及Redis是如何利用IO复用来实现定时任务（TimeEvent）和IO任务（FileEvent）的调度。本篇在前两篇文章的基础上，来研究Python的协程调度到底是如何实现的。\n",
    "\n",
    "- <https://zhuanlan.zhihu.com/p/96048314>\n",
    "- <https://zhuanlan.zhihu.com/p/95840826>\n",
    "\n",
    "本篇假定读者对asyncio的使用以及python协程有基本的熟悉并主要涉及到如下的内容：\n",
    "\n",
    "- 内容回顾： 首先对上两篇文章进行一个简单的回顾，回顾对协程的理解和特点以及Redis事件驱动的实现\n",
    "- Python的asyncio框架中事件循环的实现和任务调度的整体流程\n",
    "- asyncio实现中的三个重要概念及协程调度：TimeHandler, Eventloop，Task\n",
    "- 以asyncio.sleep()调用和实现为例介绍\n",
    "\n",
    "# 内容回顾\n",
    "\n",
    "## 协程回顾\n",
    "我们曾总结协程具有两大特点\n",
    "\n",
    "- 可保留运行时的状态数据\n",
    "- 可出让自己的执行权，当重新获得执行权时从上一次暂停的位置继续执行\n",
    "\n",
    "总结而言就是协程相比较常规的函数，是可以被打断以及恢复执行流程的。我们可以将之理解为用户级的线程，内核级的线程的调度执行是由操作系统来决定负责的，而协程的调度执行是由线程来负责的。**需要再次提醒的一点是，正常情况下，同一个进程的线程之间是可以并行的（python多线程除外，具体详见专栏文章），但是无论怎么样，同一个线程内的各个协程不是并行的，只能并发执行，具体解释见专栏中的协程介绍文章。**\n",
    "\n",
    "![pic](../assets/thread2.jpg)\n",
    "\n",
    "既然协程的调度执行需要线程来负责，那么我们就需要实现一个调度器来实现协程的调度，**在asyncio中，eventloop我们可以认为就是一个协程调度器。**\n",
    "\n",
    "![pic](../assets/corotain.jpg)\n",
    "<center>调度器实现线程调度</center>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Redis事件（任务）调度\n",
    "\n",
    "事件驱动编程最重要的两个元素就是\n",
    "\n",
    "- 事件循环（即Redis中和asyncio中的eventloop）\n",
    "- 事件在事件循环的注册及发生事件时的回调函数绑定\n",
    "\n",
    "Redis事件调度中主要调度IO事件和定时事件，**每一个事件都会指定一个回调函数**，当该事件发生的时候执行指定回调函数。同时我们还介绍**IO事件的发生与否是托管给操作系统进行管理，但时间事件的管理是Redis自己来管理的**。Redis事件循环执行一次的逻辑如下，首先检查是否有时间事件的发生，然后根据即将最早发生的时间事件的时间戳与当前时间戳的比较来决定epoll调用（我们假定底层采用epoll）的等待时长，接着使用epoll系统调用从内核拿到已经发生的IO事件，然后处理已发生的文件事件和预定时间已经到达的定时事件。\n",
    "\n",
    "![pic](../assets/redis3.jpg)\n",
    "\n",
    "<center>Redis事件循环单次执行逻辑</center>\n",
    "\n",
    "## asyncio协程调度\n",
    "\n",
    "asyncio里协程的调度就是基于事件调度实现的。那么和Redis一样，asyncio中的事件分为IO事件（主要用于socket文件事件的监控）和定时事件，**asyncio的协程调度和Redis的事件调度二者从顶层逻辑看也是几乎一致的。**即都有一个事件循环，每次执行流程和上图也几乎一致。**但协程与函数最大的不同在于协程本身的执行流程是可以被打断的，那么如果一个协程的执行流程被打断了，该如何恢复其调度？**答案仍然是回调函数。接下来我们一步步展开asyncio是如何实现协程调度的，以及下面这些代码执行每一行时底层实现到底发生了什么。\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 一个协程执行流程被打断的最主要的原因就是在协程内部调用了另一个协程函数\n",
    "import asyncio\n",
    "import asyncio.coroutines\n",
    "\n",
    "\n",
    "def _set_result_unless_cancelled(fut, result):\n",
    "    if fut.cancelled():\n",
    "        return\n",
    "    print(\"this is a rewrited sleep callback function\")\n",
    "    fut.set_result(result)\n",
    "\n",
    "\n",
    "# asyncio.coroutine函数的实现版本\n",
    "@asyncio.coroutine\n",
    "def dalong_sleep(delay, loop, result=None):\n",
    "    \"\"\"Coroutine that completes after a given time (in seconds).\"\"\"\n",
    "    if delay == 0:\n",
    "        return result\n",
    "\n",
    "    future = loop.create_future()\n",
    "    h = future._loop.call_later(delay, _set_result_unless_cancelled, future,\n",
    "                                result)\n",
    "    try:\n",
    "        return (yield from future)\n",
    "    finally:\n",
    "        h.cancel()\n",
    "\n",
    "\n",
    "async def cor1():\n",
    "    await dalong_sleep(1, loop=event_loop)\n",
    "    print(\"this coroutine cor1\")\n",
    "\n",
    "\n",
    "def call_back(res):\n",
    "    print(\"this is cor1's callback fucntion\")\n",
    "\n",
    "\n",
    "# 获得一个事件循环\n",
    "event_loop = asyncio.get_event_loop()\n",
    "# 创建一个任务，并将任务加入事件循环\n",
    "task = event_loop.create_task(cor1())\n",
    "# 给任务添加回调函数\n",
    "task.add_done_callback(call_back)\n",
    "# 开始执行任务直到结束\n",
    "event_loop.run_until_complete(task)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "this is a rewrited sleep callback function\n",
    "this coroutine cor1\n",
    "this is cor1's callback fucntion\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## asyncio实现关键概念\n",
    "\n",
    "为了方便之后的源码分析，我们先介绍asyncio实现中的几个关键概念。**需要说明的是为了方便在说明，如下的代码并不是严格的asyncio底层的实现代码，我将不影响理解的代码全部进行了删除，只留了一些核心逻辑代码。强烈下面的代码认真阅读，代码加了详细的注释比较好懂，且对协程的调度理解非常有帮助。**\n",
    "\n",
    "### 核心概念1：TimeHandler\n",
    "```py\n",
    "class TimerHandle(Handle):\n",
    "   # 需要指明发生时间when以及时间到达后执行的回调函数callback\n",
    "    def __init__(self, when, callback, args, loop, context=None):\n",
    "        #必须指明预定时间\n",
    "        assert when is not None\n",
    "        super().__init__(callback, args, loop, context)\n",
    "        if self._source_traceback:\n",
    "            del self._source_traceback[-1]\n",
    "        self._when = when\n",
    "        self._scheduled = False\n",
    "   #执行回调函数\n",
    "   def _run(self):\n",
    "        try:\n",
    "            self._context.run(self._callback, *self._args)\n",
    "        except (SystemExit, KeyboardInterrupt)\n",
    "            #代码被我删除掉了\n",
    "        self = None  # Needed to break cycles when an exception occurs.\n",
    "        \n",
    "```        \n",
    "\n",
    "TimeHandler是时间事件的句柄，实际上就是一个时间事件上面封装了一层，支持指定事件发生的时间。实例化一个TimeHandler时，需要指明该事件指定的发生时间以及该事件的回调函数。当执行一个TimeHandler对象的_run()方法时，执行该时间事件的回调函数。\n",
    "\n",
    "### 核心概念2：Eventloop\n",
    "\n",
    "事件循环的概念我们应该不陌生，在Redis中每一次事件循环都会执行asProcessEvent()的函数，对应的在asyncio中，每一次事件循环我们都会执行_run_once()函数。**asyncio中所有就绪的任务/事件都放在self._ready中，所有待发生的时间事件都放在self._scheduled中进行存储管理，其中self._scheduled是一个按照时间事件的发生时间来构成的最小堆。**\n",
    "\n",
    "```py\n",
    "class BaseEventLoop(events.AbstractEventLoop):\n",
    "    def _run_once(self):\n",
    "\n",
    "            timeout = None\n",
    "            # 如果就绪队列已经有任务了，则跳过IO事件的获取\n",
    "            if self._ready or self._stopping:\n",
    "                timeout = 0\n",
    "            # 否则检查即将最早发生的定时事件发生时间，并根据发生时间决定IO事件等待的时长\n",
    "            elif self._scheduled:\n",
    "                # self._scheduled是一个最小堆\n",
    "                when = self._scheduled[0]._when\n",
    "                timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)\n",
    "            # 获取发生的IO事件，底层可采用epoll，poll或者select系统调用\n",
    "            event_list = self._selector.select(timeout)\n",
    "            # 处理所有的IO事件\n",
    "            self._process_events(event_list)\n",
    "\n",
    "            #获取所有该发生的定时事件\n",
    "            end_time = self.time() + self._clock_resolution\n",
    "            while self._scheduled:\n",
    "                handle = self._scheduled[0]\n",
    "                if handle._when >= end_time:\n",
    "                    break\n",
    "                handle = heapq.heappop(self._scheduled)\n",
    "                handle._scheduled = False\n",
    "                self._ready.append(handle)\n",
    "            \n",
    "            #运行就绪队列中的所有任务\n",
    "            ntodo = len(self._ready)\n",
    "            for i in range(ntodo):\n",
    "                handle = self._ready.popleft()\n",
    "                handle._run()\n",
    "            \n",
    "     #将定时任务加入到schduled队列中\n",
    "    def call_later(self, delay, callback, *args, context=None):\n",
    "       \n",
    "        timer = self.call_at(self.time() + delay, callback, *args,\n",
    "                             context=context)  \n",
    "        return timer\n",
    "\n",
    "     #将定时任务加入到schduled队列中\n",
    "    def call_at(self, when, callback, *args, context=None): \n",
    "        timer = events.TimerHandle(when, callback, args, self, context)\n",
    "        heapq.heappush(self._scheduled, timer)\n",
    "        timer._scheduled = True\n",
    "        return timer\n",
    "    #将定时任务直接加入到ready队列中，下一次执行run_once函数时被执行\n",
    "    def call_soon(self, callback, *args, context=None):\n",
    "       \n",
    "        handle = self._call_soon(callback, args, context)\n",
    "        return handle\n",
    "\n",
    "    def _call_soon(self, callback, args, context):\n",
    "        handle = events.Handle(callback, args, self, context)\n",
    "        self._ready.append(handle)\n",
    "        return handle\n",
    "```        \n",
    "\n",
    "每个任务加入到事件循环中时，会实例化一个对应的TimeHandler对象，ready和scheduled中存储的就是一个个TimeHandler对象。为了方便用户指定TimeHandler中的when参数，eventloop提供了三个接口让用户把时间事件加入到事件循环中，他们分别是**call_later(), call_at(), call_soon()**， 其中call_soon()是直接将回调函数加到ready对列中，然后在下一次run_once函数被执行时该回调函数被执行，而call_later和call_soon将定时任务加入到schduled中。asyncio中的eventloop会循环执行run_once函数，run_once函数的执行逻辑如下：\n",
    "\n",
    "1. 检查ready队列中是否有任务，**如果ready队列中已经有任务**，则设置timeout为0，不获取IO事件，然后获取所有预计发生时间小于当前时间的的时间事件加入到ready队列，并执行ready队列中的事件。\n",
    "2. 如果ready队列中没有任务，就根据最早发生的时间事件的时间与当前的时间的比较结果来确定self.selector.select()函数的超时时间。（不失一般性，我们可以认为self.selector.select()底层使用的是epoll系统调用）。如果获取到了IO事件，则首先进行IO事件的处理，然后获取该发生的时间事件加入到ready队列中，最后执行所有的ready队列中的任务。\n",
    "\n",
    "上述的代码流程我认为是比较简单易懂的，没什么特殊之处。核心的过程在于获取发生的事件，并执行事件的回调函数。**使用epoll系统调用中获得IO事件并执行其回调函数，从scheduled中获取时间事件，并执行其回调函数。需要注意的是，在run_once函数中，只有最后的部分才是执行回调函数的部分，前半部分做的各种事件检查都是检查事件的回调函数是否能加入到ready队列中，即最终只执行ready队列中的回调函数。**\n",
    "\n",
    "### 核心概念3：Task\n",
    "\n",
    "Task是协程调度的核心实现，**所有的协程都是一个Task**，对协程的执行、挂起、切换、恢复执行等都是在Task中进行。Task的实现代码中集合核心方法如下，代码我都加了详细的注释，还比较易懂。**还是需要重申一遍的是，下面的代码并不是严格asyncio底层实现代码，为了方便介绍，我将Python的实现源码的很多部分进行了删除，以及将Task从Future类继承的方法加入到了Task类的方法中。**\n",
    "\n",
    "```py\n",
    "class Task(futures._PyFuture):\n",
    "\n",
    "    \"\"\"A coroutine wrapped in a Future.\"\"\"\n",
    "\n",
    "\n",
    "    def __init__(self, coro, *, loop=None, name=None):\n",
    "        super().__init__(loop=loop)\n",
    "        # Task必须用协程作为参数进行实例化\n",
    "        if not coroutines.iscoroutine(coro):\n",
    "            raise TypeError(f\"a coroutine was expected, got {coro!r}\")\n",
    "\n",
    "        #设置一些类的属性\n",
    "        self._must_cancel = False\n",
    "        self._fut_waiter = None\n",
    "        self._coro = coro\n",
    "        self._context = contextvars.copy_context()\n",
    "        #将Task 执行函数加入到事件循环的ready队列中\n",
    "        self._loop.call_soon(self.__step, context=self._context)\n",
    "\n",
    "    def __step(self, exc=None):\n",
    "\n",
    "        coro = self._coro\n",
    "        self._fut_waiter = None\n",
    "        try:\n",
    "\t    # 通过执行协程的send方法\n",
    "            result = coro.send(None)\n",
    "        except StopIteration as exc:\n",
    "\t\t# 协程执行完毕会raise一个StopIteration的异常\n",
    "\t\t# 如果执行协程执行完毕，则将设置执行的结果\n",
    "                super().set_result(exc.value)\n",
    "        else:\n",
    "\t    #执行到这一步说明协程中使用了await/yield/yield from\n",
    "\t    #否则协程是不会被打断的，会直接执行完\n",
    "            blocking = getattr(result, '_asyncio_future_blocking', None)\n",
    "\t    # 如果blocking不是None，说明result不是None，此时说明\n",
    "            if blocking is not None:\n",
    "\t\t#说明此时使用了await和yield from等待另一个线程\n",
    "\t\t#否则result不会有_asyncio_future_blocking属性\n",
    "                if blocking:\n",
    "\t            # 不允许协程await/yield from自己，否则陷入循环await了\n",
    "                    if result is self:\n",
    "                        new_exc = RuntimeError(\n",
    "                            f'Task cannot await on itself: {self!r}')\n",
    "                        self._loop.call_soon(\n",
    "                            self.__step, new_exc, context=self._context)\n",
    "\t\t    # await/yield from了别的协程函数\n",
    "                    else:\n",
    "                        result._asyncio_future_blocking = False\n",
    "\t\t\t# 把当前协程的_wakeup函数设置为等待协程的回调函数(对方结束后，把我唤醒）\n",
    "                        result.add_done_callback(\n",
    "                            self.__wakeup, context=self._context)\n",
    "                        self._fut_waiter = result\n",
    " \n",
    "\n",
    "            elif result is None:\n",
    "                # 当该协程函数中出现一行语句：yield，这时相当于放弃一次执行权\n",
    "\t\t# 然后再把step函数加入到ready队列中\n",
    "                self._loop.call_soon(self.__step, context=self._context)\n",
    "\t\t\t\t\n",
    "\n",
    "    def __wakeup(self, future):\n",
    "        #再次执行__step函数\n",
    "        self.__step()\n",
    "\t\n",
    "    \"\"\"\n",
    "    如下的方法都是Future类中的方法，由于Task继承Future类，且为减少篇幅，方便说明\n",
    "    将这些方法写到了这里\n",
    "    \"\"\"\n",
    "    def add_done_callback(self, fn, *, context=None):\n",
    "\t#将回调函数加入到回调函数列表中\n",
    "        if context is None:\n",
    "            context = contextvars.copy_context()\n",
    "        self._callbacks.append((fn, context))\n",
    "\t\n",
    "    def set_result(self, result):\n",
    "\t#设置result属性，视为协程结束，并调度所有的回调函数, \n",
    "        self._result = result\n",
    "        self._state = _FINISHED\n",
    "        self.__schedule_callbacks()\n",
    "\t\t\n",
    "    def __schedule_callbacks(self):\n",
    "\t#将所有的回调函数加入到事件循环的ready对列中\n",
    "        callbacks = self._callbacks[:]\n",
    "        for callback, ctx in callbacks:\n",
    "            self._loop.call_soon(callback, self, context=ctx)\n",
    "```            "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 测试\n",
    "import asyncio\n",
    "\n",
    "\n",
    "async def foo(sleep_time):\n",
    "    print('start foo')\n",
    "    await asyncio.sleep(sleep_time)\n",
    "    return 123\n",
    "\n",
    "\n",
    "def foo2():\n",
    "    print('start')\n",
    "    yield\n",
    "    return 123\n",
    "\n",
    "async def test():\n",
    "    print('start')\n",
    "    result = await foo(1)\n",
    "#     print('step1')\n",
    "#     result = await foo(1)\n",
    "#     print('step2')\n",
    "#     result = await foo(2)\n",
    "#     print('step3')\n",
    "    return result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<coroutine object foo at 0x00000168E862A9C8>"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "t = foo(0)\n",
    "# t = foo(1)\n",
    "t"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "t = test()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "start\n",
      "start foo\n",
      "<Future pending>\n"
     ]
    }
   ],
   "source": [
    "try:\n",
    "    result = t.send(None)\n",
    "    print(result)\n",
    "except StopIteration as exc:\n",
    "    print('result:', exc.value)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "getattr(result, '_asyncio_future_blocking', None)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在Task中实际上完成了协程函数的执行、挂起、切换、结束之后调用其回调函数。**对于一个Task，很重要的一点在于可以对其加入回调函数，即该Task的协程运行结束后，会调度执行所有的回调函数。**我们按照一个协程被创建为一个Task到Task执行完毕的执行过程来分析上述的代码。\n",
    "\n",
    "1. **__init__()**函数：使用协程创建一个Task，我们命名为Task1，同时在__init__函数的最后，将Task1的__step()函数加入了事件循环的就绪队列中，下一次执行run_once函数时，这个_step函数就会被执行。\n",
    "2. **__step()**函数：实际上为一个Task的执行函数, 首先会调用当前协程的send()函数进行当前协程的执行\n",
    "3. **如果当前协程中没有使用yield/yield from/await，则会顺利的执行完毕（因为不会被打断）**。协程执行结束时会raise一个stopIteration的异常，__step()中捕获到后，就执行Task1的结果设置函数set_result()\n",
    "    - set_result(): 在set_result()函数中，设置了result结果，同时执行了所有回调函数的调度函数。（可以对一个Task对象重复调用add_done_callback()加入多个回调函数，这些回调函数被放在一个列表中维护。）\n",
    "    - __schedule_callbacks()：在回调函数调度中，将回调函数列表中的所有的回调函数加入到事件循环的ready队列中，下次run_once函数被运行时，这些回调函数会被执行。\n",
    "4. **如果当前协程中使用了yield from或者await等待另一个协程的执行完毕**，则就把当前Task的唤醒函数_wakeup()加入到被执行协程的回调函数\n",
    "    - _wakeup()：当被等待写成执行完毕，执行其回调函数wakeup时，wakeup函数重新执行step函数\n",
    "    \n",
    "我们发现有了Task类的辅助，协程的运行、切换、挂起、唤醒变得非常的容易实现和理解。即\n",
    "\n",
    "1. 如果一个Task的协程正常执行完，我们就设置Task的结果属性，然后执行Task的回调函数。\n",
    "2. 如果该Task的协程由于中间使用了yield/yield from/ await从而被打断了执行流程，则将当前**Task的协程的唤醒函数作为被调用的协程所属的Task的回调函数(对方结束后，主动把我唤醒）**。在被调用协程顺利执行完毕后，按照情况1的执行流程，当前Task的唤醒函数会被作为回调函数执行，从而又可以继续执行当前的协程。\n",
    "\n",
    "这基本就是协程切换、执行的核心内容了，应该还是比较好懂的。接下来我们以一段示例代码来将上面的所有内容串起来，研究每一步底层到底都执行了什么。\n",
    "\n",
    "**以asyncio.sleep()调用为例**\n",
    "\n",
    "本节我们以asyncio.sleep()调用为例，来追踪每一步到底发生了什么，调用官方的代码应该是下面的代码这样。但是为了更好的研究，我将asyncio.sleep()函数的实现进行了少量的修改，把实现代码都放在同一个代码文件中，所以我们跳过这个代码片段，直接研究下一个代码片段。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 274,
   "metadata": {},
   "outputs": [],
   "source": [
    "#一个协程执行流程被打断的最主要的原因就是在协程内部调用了另一个协程函数\n",
    "import asyncio\n",
    "\n",
    "\n",
    "async def cor1():\n",
    "    await asyncio.sleep(1)\n",
    "    print(\"this coroutine cor1\")\n",
    "\n",
    "def call_back(res):\n",
    "    print(\"this is cor1's callback fucntion\")\n",
    "    \n",
    "# 获得一个事件循环\n",
    "event_loop = asyncio.get_event_loop()\n",
    "# 创建一个任务，并将任务加入事件循环\n",
    "task = event_loop.create_task(cor1())\n",
    "# 给任务添加回调函数\n",
    "task.add_done_callback(call_back)\n",
    "# 开始执行任务直到结束\n",
    "event_loop.run_until_complete(task)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们按照整个函数的执行流程来研究其每一步到底发生了什么。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 代码片段1\n",
    "#一个协程执行流程被打断的最主要的原因就是在协程内部调用了另一个协程函数\n",
    "import asyncio\n",
    "import asyncio.coroutines\n",
    "\n",
    "\n",
    "def _set_result_unless_cancelled(fut, result):\n",
    "    if fut.cancelled():\n",
    "        return\n",
    "    print(\"this is a rewrited sleep callback function\")\n",
    "    fut.set_result(result)\n",
    "\n",
    "\n",
    "# asyncio.sleep()函数的实现版本\n",
    "@asyncio.coroutine\n",
    "def dalong_sleep(delay, result=None, *, loop=None):\n",
    "    \"\"\"Coroutine that completes after a given time (in seconds).\"\"\"\n",
    "    #如果delay=0， 直接yield\n",
    "    if delay == 0:\n",
    "        yield\n",
    "        return result\n",
    "\n",
    "    future = loop.create_future()\n",
    "    h = future._loop.call_later(delay,\n",
    "                                _set_result_unless_cancelled,\n",
    "                                future, result)\n",
    "    try:\n",
    "        # 遇到yield from，则当前协程的执行流程被打断\n",
    "        return (yield from future)\n",
    "    finally:\n",
    "        h.cancel()\n",
    "\n",
    "\n",
    "async def cor1():\n",
    "    # 遇到await，当前协程执行流程被打断，当前Task的唤醒函数被设置为dalong_sleep的Task的\n",
    "    # 的回调函数\n",
    "    await dalong_sleep(1, loop = event_loop)\n",
    "    print(\"this coroutine cor1\")\n",
    "\n",
    "\n",
    "def call_back(res):\n",
    "    print(\"this is cor1's callback fucntion\")\n",
    "# 获得一个事件循环\n",
    "event_loop = asyncio.get_event_loop()\n",
    "# 创建一个任务，并将任务加入事件循环\n",
    "task = event_loop.create_task(cor1())\n",
    "# 给任务添加回调函数\n",
    "task.add_done_callback(call_back)\n",
    "# 开始执行任务直到结束\n",
    "event_loop.run_until_complete(task)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1. 首先通过get_event_loop()获得一个事件循环 event_loop。\n",
    "\n",
    "2. 使用协程cor1()创建一个Task对象（为方便指代，我们命名为Task1） **在Task1的创建函数的最后，Task1的_step函数被加入到事件循环的ready队列中，**进而在下一次run_once函数被执行时Task1的协程会被调度执行。(asyncio.create_task()就是执行Task实例化，然后返回Task对象，Task对象初始化时最后一步将step函数加入到事件循环的ready队列中)\n",
    "\n",
    "3. 对Task1注册一个回调函数call_back()，当Task1执行结束后，在set_result()函数中会将所有的回调函数加入到事件循环的ready队列中。\n",
    "4. 开始执行事件循环，每次事件循环都会执行run_once函数。进入到run_once函数中，首先由于第2步在ready队列中加入了Task的step函数，所以开始执行step函数，而step函数会通过协程的send()函数触发cor1()协程的执行。\n",
    "\n",
    "5. cor1()函数中遇到await，于是将cor1对应的Task1的wakeup函数加入到dalong_sleep()协程对应的Task的回调函数中（将dalong_sleep()对应的Task命名为Task2。）\n",
    "6. **如果调用dalong_sleep时delay参数为0。**\n",
    "\n",
    "    1. 则dalong_sleep中直接调用yield，dalong_sleep执行流程被打断，但按照step函数中的逻辑，由于Task2并没有等待任何其他的协程执行完毕，所以Task2的step函数会被重新加入到事件循环的ready队列中，然后再下一次run_once函数被执行时再次执行Task2的step函数。\n",
    "    \n",
    "    2. 当dalong_sleep协程被再次继续执行时，就从yield的下一句开始执行，由于紧接着就是return语句，所以协程执行完毕正常退出并执行其set_result函数。在该函数中设置结果并执行回调函数即Task1的wakeup函数，于是Task1被继续执行。\n",
    "    \n",
    "    3. Task1的step函数被继续执行，在step函数中使用send()恢复cor1协程的执行流程。由于后续没有遇到任何的yield/await/yield from，所以协程顺利执行到结束。cor1结束时，调用Task1的set_result函数，设置result属性并将其回调函数加入到事件循环的ready队列中。**在下一次的run_once函数被执行时，其回调函数被执行。**\n",
    "    \n",
    "7. **如果delay不为0。**\n",
    "\n",
    "    1. dalong_sleep中首先创建一个future对象（**不影响理解，可以近似认为就是创建了一个Task对象，我们将之命名为Task3虽然实现上Future为Task的父类**）。指定其发生的时间为delay时长之后的时间戳，然后将其加入到事件循环的schedule堆中，并绑定回调函数为_set_result_unless_cancelled， 在该时间事件发生后该回调函数被执行。\n",
    "\n",
    "    2. 继续向下执行遇到yield from等待Task3，按照协程的切换执行流程。会将Task2的唤醒函数wakeup注册为Task3的回调函数。此时Task3有两个回调函数，一个是set_result_unless_cancelled，一个是Task2的唤醒函数。\n",
    "\n",
    "    3. 当睡眠的时间到达后，在run_once函数中就会执行Task3的回调函数。执行Task2的唤醒函数之后，Task2可以被继续执行。\n",
    "    \n",
    "    4. Task2遇到dalong_sleep协程中的return语句，故Task2执行结束，调用其set_result函数并将其回调函数加入到事件循环的ready队列中，Task2的其中一个回调函数就是Task1的唤醒函数\n",
    "\n",
    "    5. Task1被唤醒之后被继续执行到结束。然后将其回调函数加入到ready队列中\n",
    "\n",
    "    6. Task1的回调函数被执行。\n",
    "\n",
    "# 总结\n",
    "本篇中我们主要介绍了python的asyncio中协程的调度的底层实现，我们发现asyncio事件循环的run_once函数的执行逻辑与Redis事件循环的aeProcessEvent()函数几乎一致。与Redis不同的是，在协程的调度执行中，我们需要关注协程被打断执行流之后如何恢复其执行。而在asyncio的实现中我们也发现：**asyncio借助了Task类，通过将当前协程所对应的Task的唤醒函数设置为被调用协程的回调函数从而实现了协程的切换、挂起以及唤醒。**最后我们通过一个函数代码示例来详细的分析函数的每一步中协程调度底层到底发生了什么，示例代码虽然较短，却也几乎包含了所有协程中常用的概念和使用方法，希望这篇文章对大家更深入的了解协程有所帮助。**当然了，协程的调度我们是研究清楚了，但是协程还有一个最关键的特点是：可以被打断执行流程和恢复执行流程。那么如何实现在协程被打断时保存上下文，并在下一次恢复执行时再恢复执行流程？**这个内容就会在后续的博客介绍。\n",
    "\n",
    "# 后记\n",
    "最近的两篇博客都涉及到比较多的源码分析，之后的文章应该会主要交替进行。一些文章专注于一些系统设计、概念理解的内容，一些博客专注于这些系统和概念背后的具体实现代码。\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
